package com.kool.kmqtt.service;

import com.alibaba.fastjson.JSON;
import com.kool.kmqtt.service.vo.KafkaMsg;
import com.kool.kmqtt.util.DateUtil;
import com.kool.kmqtt.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;
import java.util.Date;

/**
 * @author : luyu
 * @date :2021/3/31 19:16
 */
@Component
@Slf4j
public class KafkaProvider {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Value("${provider_group_id:center}")
    private String providerGroupId;
    @Value("${provider_id:}")
    private String providerId;

    /**
     * 发送数据向kafka
     *
     * @param topicSuffix
     * @param message
     */
    public void sendToKafka(final String topicSuffix, final String message) {
        KafkaMsg kafkaMsg = new KafkaMsg();
        kafkaMsg.setMsgId(StringUtil.getUUID());
        kafkaMsg.setTopicSuffix(topicSuffix);
        kafkaMsg.setTime(DateUtil.dateString(new Date()));
        kafkaMsg.setProviderId(providerId);
        kafkaMsg.setMessage(message);
        ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(providerGroupId + "_" + topicSuffix, JSON.toJSONString(kafkaMsg));
        send.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable e) {
                log.error("发送至kafka失败，失败原因：" + e.getMessage(), e);
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {

            }
        });
    }
}
